package com.bieber.common.compute;

import com.bieber.common.Constants;
import com.bieber.common.model.Task;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.compute.*;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by bieber on 2015/9/5.
 */
public class ComputePersonOrderTask implements ComputeTask<Task,Integer> {
    
    private static final int perJobPersonSize = 100;
    
    @Nullable
    @Override
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, Task task) throws IgniteException {
        int nodeSize = nodes.size();
        int personSize =task.getPersonSize();
        int jobSize = personSize/perJobPersonSize;
        if(personSize%perJobPersonSize!=0){
            jobSize++;
        }
        int perNodeJobs=jobSize/nodeSize;
        if(jobSize%nodeSize!=0){
            perNodeJobs++;
        }
        
        Map<ComputeJob,ClusterNode> nodeMap = new HashMap<ComputeJob, ClusterNode>();
        int startIndex=0;
        for(int i=0;i<nodeSize;i++){
            ClusterNode node = nodes.get(i);
            for(int j=0;j<perNodeJobs;j++){
                ComputePersonOrderJob job = new ComputePersonOrderJob(startIndex,perJobPersonSize,task.getOrderCacheName());
                nodeMap.put(job,node);
                startIndex+=perJobPersonSize;
            }
        }
        return nodeMap;
    }

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
        IgniteException e = res.getException();
        if (e != null) {
            // Don't failover user's code errors.
            if (e instanceof ComputeExecutionRejectedException ||
                    e instanceof ClusterTopologyException ||
                    // Failover exception is always wrapped.
                    e.hasCause(ComputeJobFailoverException.class))
                return ComputeJobResultPolicy.FAILOVER;

            throw new IgniteException("Remote job threw user exception (override or implement ComputeTask.result(..) " +
                    "method if you would like to have automatic failover for this exception).", e);
        }
        return ComputeJobResultPolicy.WAIT;
    }

    @Nullable
    @Override
    public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
        int totalSize = 0;
        for(int i=0;i<results.size();i++){
            Constants.COMMON_LOGGER.info("Node [{}] had done [{}]",results.get(i).getNode(),results.get(i).getData());
            totalSize+=(Integer)results.get(i).getData();
        }
        return totalSize;
    }
}
